package com.mimo.push.service.impl;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

import javax.annotation.Resource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;

import com.mimo.common.comet.dto.ZoneDTO;
import com.mimo.common.exception.RetryableRequireException;
import com.mimo.common.logic.code.StatusCode;
import com.mimo.common.logic.dto.msg.BaseDispatchMessage;
import com.mimo.common.logic.dto.msg.DispatchMsgType;
import com.mimo.common.logic.dto.msg.Room2PeerMessage;
import com.mimo.push.binder.MessageBrokerProcessor;
import com.mimo.push.config.CometMsgPoolFactory;
import com.mimo.push.service.IPushService;
import com.mimo.push.service.IRoomService;
import com.mimo.push.service.IUserZoneService;

@Service
public class PushServiceImpl implements IPushService {

  private static final Logger log = LoggerFactory.getLogger(PushServiceImpl.class);

  private static final long MAX_EXPIRED = Duration.ofMinutes(5).toMillis();

  @Autowired
  private CometMsgPoolFactory cometMsgPoolFactory;

  @Autowired
  private IUserZoneService userZoneService;

  @Autowired
  private IRoomService roomService;

  @Resource(name = "cometMsgExecutor")
  private Executor cometMsgExecutor;

  @Resource
  private IPushService selfPushService;

  @StreamListener(MessageBrokerProcessor.INPUT)
  @Override
  public void onMessage(BaseDispatchMessage msg) {
    if (System.currentTimeMillis() - msg.getTimestamp() <= MAX_EXPIRED) { // 对于消息堆积，并超过一定时长的，直接抛弃
      CompletableFuture.runAsync(() -> selfPushService.send(msg.getTarget(), msg), cometMsgExecutor)
          .exceptionally(e -> {
            selfPushService.onFailed(msg.getTarget(), msg, e);
            return null;
          });
    } else {
      log.warn("消息[{}]接收到时已经超期", msg);
    }

  }

  /**
   * 考虑到推送过程的一些异常场景,此处对于推送结果做判断,对于comet坐落有误的，主动重试一次
   */
  @Override
  @Retryable(value = RetryableRequireException.class, maxAttempts = 1)
  public void send(String targetUserId, BaseDispatchMessage msg) {
    Assert.isTrue(Objects.equals(targetUserId, msg.getTarget()), "投递目标数据异常");

    // 如果是房间分发的，则需要查看用户与房间的关系
    if (Objects.equals(msg.getType(), DispatchMsgType.Room2Peer)) {
      Room2PeerMessage rpmsg = Room2PeerMessage.class.cast(msg);
      if (!roomService.isMemberOf(rpmsg.getRoomId(), rpmsg.getTarget())) { // 如果用户与房间没有归属，则直接抛弃
        return;
      }
    }

    ZoneDTO zone = userZoneService.getZoneByUser(targetUserId);
    if (Objects.nonNull(zone)) {
      StatusCode statusCode = StatusCode.from((cometMsgPoolFactory.getCometMessageStub(zone.getIp(), zone.getPort())
          .send(BaseDispatchMessage.convert(msg))));
      if (statusCode == StatusCode.CometUserUnlocated) { // 如果目标对象坐落的位置不对，则重试
        userZoneService.clear(targetUserId); // 清空用户的zone信息
        throw new RetryableRequireException();
      } else if (statusCode == StatusCode.UserUnavailable) { // 消息发送失败, 则一般是由连接问题，就不再重试了
        // 目标对象无法正常接收消息
      }
    } else {
      // 目标对象会话信息不存在
    }
  }

  @Override
  public void onFailed(String targetUserId, BaseDispatchMessage msg, Throwable throwable) {
    if (!(throwable instanceof RetryableRequireException)) {// 只有非retry的异常才做打印
      log.error(String.format("调用Comet推送消息失败, target:[%s] ,msg:[%s]", targetUserId, msg.getId(), throwable));
    }
  }

}
